package org.databandtech.mysql2hive;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.DefaultRowFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.databandtech.mysql2hive.source.SourceFromMySQLByTuple5;

public class MysqlToHDFS {

	// 数据库相关
	final static String URL = "jdbc:mysql://127.0.0.1:3307/databand?useUnicode=true&characterEncoding=utf-8&useSSL=false";
	final static String USER = "root";
	final static String PASS = "mysql";
	final static String SQL = "SELECT title,status,vid,cover_id,url from databand_video ORDER BY id DESC LIMIT 100";
	final static String[] COLUMNS_READ = new String[] { "title", "status", "vid", "cover_id", "url" };

	//指定分桶方式， 1：时间桶 ；  2：全局桶； 3：自定义桶
	final static int BUCKETTYPE = 1;
	//hdfs路径
	//final static String HDFSDIR = "/root/input"; //默认就是hadoop文件系统
	//final static String HDFSDIR = "hdfs://mynamenode:8020/root/input"; //指定namenode的写法
	final static String HDFSDIR = "file:///D:/input"; //写到本地文件系统，windows
	//final static String HDFSDIR = "file:///home/mydir"; //写到本地文件系统，linux
	

	public static void main(String[] args) {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000));

		// ######### 数据读取 #########
		DataStreamSource<Tuple5<String, Integer, String, String, String>> sourceStream = env
				.addSource(new SourceFromMySQLByTuple5(URL, USER, PASS, COLUMNS_READ, SQL));

		sourceStream.print();
		//Map装载
        SingleOutputStreamOperator<String> dataStream = sourceStream.map(new MapTransformation());
        
        //SequenceFile 格式
		//Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
		//final StreamingFileSink<Tuple2<LongWritable, Text>> sink = StreamingFileSink
		//          .forBulkFormat(
		//            outputBasePath,
		//            new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, Text.class))
		//.build();
        
        //Parquet 格式
        //final StreamingFileSink<GenericRecord> sink = StreamingFileSink
        //		.forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
        //		.build();
        
        //行编码格式
		DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
		        .create()
		        .withMaxPartSize(1024*1024*120) // 设置每个文件的最大大小 ,默认是128M。这里设置为120M
		        .withRolloverInterval(Long.MAX_VALUE) // 滚动写入新文件的时间，默认60s。这里设置为无限大
		        .withInactivityInterval(60*1000) // 60s空闲，就滚动写入新的文件
		        .build();
		
		//配置分部文件的前缀和后缀
		OutputFileConfig config = OutputFileConfig
				 .builder()
				 .withPartPrefix("myPrefix")
				 .withPartSuffix(".txt")
				 .build();
		
		DefaultRowFormatBuilder<String> sinkRowFormatBuilder = StreamingFileSink
		        .forRowFormat(new Path(HDFSDIR), new SimpleStringEncoder<String>("UTF-8"))
		        .withRollingPolicy(rollingPolicy)
		        //.withRollingPolicy(OnCheckpointRollingPolicy.build())
		        .withBucketCheckInterval(1000L) // 桶检查间隔，这里设置为1s	
		        .withOutputFileConfig(config); 	        
				
		if (BUCKETTYPE == 1) {
			sinkRowFormatBuilder.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")); //按时间桶分配			
		}
		if (BUCKETTYPE == 2) {
			sinkRowFormatBuilder.withBucketAssigner(new BasePathBucketAssigner()); //将所有部分文件（part file）存储在单个全局桶	
		}
		
		StreamingFileSink<String> sink = sinkRowFormatBuilder.build();
				 
		dataStream.addSink(sink);

		try {
			env.execute("ok-hdfs");
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
	
	public static class MapTransformation implements MapFunction<Tuple5<String,Integer,String,String,String>,String> {

		private static final long serialVersionUID = 1L;

		public String map(Tuple5<String,Integer,String,String,String> in) throws Exception {
			String result = in.f0 +","+in.f1 +","+in.f2 +","+in.f3 +","+in.f4 ;
			return result;
		}
	}

}
